# data_processing/traffic_processor.py
from pyspark.sql import DataFrame
from pyspark.sql.functions import col, date_format, weekofyear, explode, array


class TrafficDataProcessor:
    @staticmethod
    def preprocess_data(df: DataFrame) -> DataFrame:
        """数据预处理：添加时间特征"""
        return (df
                .withColumn("week", weekofyear("date"))
                .withColumn("weekday", date_format("date", "E"))
                .withColumn("month", date_format("date", "MMMM"))
                .withColumn("day_of_month", date_format("date", "d"))
                .cache())

    @staticmethod
    def extract_top_products(df: DataFrame) -> DataFrame:
        """提取热门产品数据"""
        return (df
                .select(explode(array("top_product_1", "top_product_2", "top_product_3")).alias("product"))
                .groupBy("product")
                .count()
                .orderBy("count", ascending=False))